package com.bfxy.disruptor.common;

import com.bfxy.disruptor.common.balance.Sequence;
import com.bfxy.disruptor.common.balance.SequenceBarrier;
import com.bfxy.disruptor.common.event.*;
import com.bfxy.disruptor.common.producer.MultiProducer;
import com.bfxy.disruptor.common.producer.Productor;
import com.bfxy.disruptor.common.producer.SingleProducer;
import com.bfxy.disruptor.common.waitStraty.BlockingWaitStrategy;
import com.bfxy.disruptor.common.waitStraty.WaitStrategy;

import sun.misc.Unsafe;

abstract class RingBufferPad {
	protected long p1, p2, p3, p4, p5, p6, p7;
}

//RingBuffer里面存的是生产者的sequencer，不是用户定义的生产者，是与生产者的桥梁。RingBuffer里面的操作很多都是这个sequencer做的。

abstract class RingBufferFields<E> extends RingBufferPad {
	private static final int BUFFER_PAD;
	private static final long REF_ARRAY_BASE;//REF_ARRAY_BASE就是实际元素的起始地址
	private static final int REF_ELEMENT_SHIFT;
	private static final Unsafe UNSAFE = Util.getUnsafe();
	static {
		final int scale = UNSAFE.arrayIndexScale(Object[].class);
		if (4 == scale) {
			REF_ELEMENT_SHIFT = 2;
		} else if (8 == scale) {
			REF_ELEMENT_SHIFT = 3;
		} else {
			throw new IllegalStateException("Unknown pointer size");
		}
		BUFFER_PAD = 4;//128 / scale;  
		// Including the buffer pad in the array base offset
		REF_ARRAY_BASE = UNSAFE.arrayBaseOffset(Object[].class) + (BUFFER_PAD << REF_ELEMENT_SHIFT);// 144
	}

	private final long indexMask;
	private final Object[] entries;// ringbuffer底层的数据结构,就是真实数据的数组
	protected final int bufferSize;// 传进来的buffersize，封装在了Sequencer里面。整个数组容量额大小。
	
	// 单生产者还是多生产者。封装 bufferSize和等待策略的Sequencer，创建RingBuffer时候new出来的。Sequencer里面包含Sequence。
	protected final Productor sequencer;

	RingBufferFields(EventFactory<E> eventFactory, Productor sequencer) {
		this.sequencer = sequencer;//单或多生产者：SingleProducerSequencer@4439f31e，MultiProducer@67b6d4ae
		this.bufferSize = sequencer.getBufferSize();

		if (bufferSize < 1) {
			throw new IllegalArgumentException("bufferSize must not be less than 1");
		}
		if (Integer.bitCount(bufferSize) != 1) {
			throw new IllegalArgumentException("bufferSize must be a power of 2");
		}
		 // indexMask主要是为了使用位运算取模的，很多源码里都能看到这类优化
		this.indexMask = bufferSize - 1;
		// 可以看到这个数组除了正常的size之外还有填充的元素，这个是为了解决false sharing的， 
		this.entries = new Object[sequencer.getBufferSize() + BUFFER_PAD];  
		fill(eventFactory);// 缓存预加载
	}

	private void fill(EventFactory<E> eventFactory) {
		for (int i = 0; i < bufferSize; i++) {
			// 内存预加载BUFFER_PAD开始的bufferSize长度预加载。
			entries[BUFFER_PAD + i] = eventFactory.newInstance();// 总共68个，32，33，34，35有值是com.bfxy.disruptor.quickstart.OrderEvent@100fc185
		}
	}

	// 通过序号从entries获取sequence的元素
	@SuppressWarnings({ "unchecked", "restriction" })
	protected final E elementAt(long sequence) {
		return (E) UNSAFE.getObject(entries, REF_ARRAY_BASE + ((sequence & indexMask) << REF_ELEMENT_SHIFT));
	}
}

public final class RingBuffer<E> extends RingBufferFields<E> implements Cursored, EventSequencer<E>, EventSink<E> {
	// 初始化Sequence的大小是-1
	public static final long INITIAL_CURSOR_VALUE = Sequence.INITIAL_VALUE;
	protected long p1, p2, p3, p4, p5, p6, p7;

	RingBuffer(EventFactory<E> eventFactory, Productor sequencer) {
		super(eventFactory, sequencer);
	}

	public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize,
			WaitStrategy waitStrategy) {
		MultiProducer sequencer = new MultiProducer(bufferSize, waitStrategy);//生产者里面有缓存大小和等待策略。只有一个。
		//构造RingBuffer时候，会选择单生产者还是多生产者的Producer，把Producer和事件工厂传给RingBuffer。
		return new RingBuffer<E>(factory, sequencer);
	}

	public static <E> RingBuffer<E> createMultiProducer(EventFactory<E> factory, int bufferSize) {
		return createMultiProducer(factory, bufferSize, new BlockingWaitStrategy());
	}

	public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,
			WaitStrategy waitStrategy) {
		SingleProducer sequencer = new SingleProducer(bufferSize, waitStrategy);//这个单或多生产者在RingBuffer和栅栏里面都有，并且只有一个。
		//构造RingBuffer时候，会选择单生产者还是多生产者的Producer，把Producer和事件工厂传给RingBuffer。
		return new RingBuffer<E>(factory, sequencer);
	}

	public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize) {
		return createSingleProducer(factory, bufferSize, new BlockingWaitStrategy());
	}

	public static <E> RingBuffer<E> create(ProducerType producerType, EventFactory<E> factory, int bufferSize,
			WaitStrategy waitStrategy) {
		switch (producerType)// SINGLE，单生产者和和多生产者用的sequencer是不一样的。
		{
		case SINGLE://单生产者
			return createSingleProducer(factory, bufferSize, waitStrategy);
		case MULTI://多生产者
			return createMultiProducer(factory, bufferSize, waitStrategy);
		default:
			throw new IllegalStateException(producerType.toString());
		}
	}

	@Override
	public E get(long sequence) {
		return elementAt(sequence);
	}

	//获取ringbuffer的下一个位置，是从生产者sequencer获取的，
	@Override
	public long next() {
		return sequencer.next();
	}

	@Override
	public long next(int n) {
		return sequencer.next(n);
	}

	@Override
	public long tryNext() throws InsufficientCapacityException {
		return sequencer.tryNext();
	}

	@Override
	public long tryNext(int n) throws InsufficientCapacityException {
		return sequencer.tryNext(n);
	}

	@Deprecated
	public void resetTo(long sequence) {
		sequencer.claim(sequence);
		sequencer.publish(sequence);
	}

	public E claimAndGetPreallocated(long sequence) {
		sequencer.claim(sequence);
		return get(sequence);
	}

	public boolean isPublished(long sequence) {
		return sequencer.isAvailable(sequence);
	}

	// 获取所有消费者的序号，是从生产者sequencer获取的，
	public void addGatingSequences(Sequence... gatingSequences) {
		sequencer.addGatingSequences(gatingSequences);
	}

	// 获取消费者中最小的序号，生产者才能往里面去投递数据，是从生产者sequencer获取的，
	public long getMinimumGatingSequence() {
		return sequencer.getMinimumSequence();
	}

	public boolean removeGatingSequence(Sequence sequence) {
		return sequencer.removeGatingSequence(sequence);
	}

	// 创建一个序号栅栏，是从生产者sequencer获取的，
	public SequenceBarrier newBarrier(Sequence... sequencesToTrack) {
		return sequencer.newBarrier(sequencesToTrack);//sequencesToTrack = new Sequence[0],sequencer就是单生产者或者多生产者，
	}

	public EventPoller<E> newPoller(Sequence... gatingSequences) {
		return sequencer.newPoller(this, gatingSequences);
	}

	// 获取ringbuffer自己本身的游标，是从生产者sequencer获取的，
	@Override
	public long getCursor() {
		return sequencer.getCursor();
	}

	public int getBufferSize() {
		return bufferSize;
	}

	public boolean hasAvailableCapacity(int requiredCapacity) {
		return sequencer.hasAvailableCapacity(requiredCapacity);
	}

	// 投递实现了EventTranslator接口的对象
	@Override
	public void publishEvent(EventTranslator<E> translator) {
		final long sequence = sequencer.next();
		translateAndPublish(translator, sequence);
	}

	@Override
	public boolean tryPublishEvent(EventTranslator<E> translator) {
		try {
			final long sequence = sequencer.tryNext();
			translateAndPublish(translator, sequence);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
		final long sequence = sequencer.next();
		translateAndPublish(translator, sequence, arg0);
	}

	@Override
	public <A> boolean tryPublishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
		try {
			final long sequence = sequencer.tryNext();
			translateAndPublish(translator, sequence, arg0);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A, B> void publishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1) {
		final long sequence = sequencer.next();
		translateAndPublish(translator, sequence, arg0, arg1);
	}

	@Override
	public <A, B> boolean tryPublishEvent(EventTranslatorTwoArg<E, A, B> translator, A arg0, B arg1) {
		try {
			final long sequence = sequencer.tryNext();
			translateAndPublish(translator, sequence, arg0, arg1);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A, B, C> void publishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2) {
		final long sequence = sequencer.next();
		translateAndPublish(translator, sequence, arg0, arg1, arg2);
	}

	@Override
	public <A, B, C> boolean tryPublishEvent(EventTranslatorThreeArg<E, A, B, C> translator, A arg0, B arg1, C arg2) {
		try {
			final long sequence = sequencer.tryNext();
			translateAndPublish(translator, sequence, arg0, arg1, arg2);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public void publishEvent(EventTranslatorVararg<E> translator, Object... args) {
		final long sequence = sequencer.next();
		translateAndPublish(translator, sequence, args);
	}

	@Override
	public boolean tryPublishEvent(EventTranslatorVararg<E> translator, Object... args) {
		try {
			final long sequence = sequencer.tryNext();
			translateAndPublish(translator, sequence, args);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public void publishEvents(EventTranslator<E>[] translators) {
		publishEvents(translators, 0, translators.length);
	}

	@Override
	public void publishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) {
		checkBounds(translators, batchStartsAt, batchSize);
		final long finalSequence = sequencer.next(batchSize);
		translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
	}

	@Override
	public boolean tryPublishEvents(EventTranslator<E>[] translators) {
		return tryPublishEvents(translators, 0, translators.length);
	}

	@Override
	public boolean tryPublishEvents(EventTranslator<E>[] translators, int batchStartsAt, int batchSize) {
		checkBounds(translators, batchStartsAt, batchSize);
		try {
			final long finalSequence = sequencer.tryNext(batchSize);
			translateAndPublishBatch(translators, batchStartsAt, batchSize, finalSequence);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) {
		publishEvents(translator, 0, arg0.length, arg0);
	}

	@Override
	public <A> void publishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize, A[] arg0) {
		checkBounds(arg0, batchStartsAt, batchSize);
		final long finalSequence = sequencer.next(batchSize);
		translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
	}

	@Override
	public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, A[] arg0) {
		return tryPublishEvents(translator, 0, arg0.length, arg0);
	}

	@Override
	public <A> boolean tryPublishEvents(EventTranslatorOneArg<E, A> translator, int batchStartsAt, int batchSize,
			A[] arg0) {
		checkBounds(arg0, batchStartsAt, batchSize);
		try {
			final long finalSequence = sequencer.tryNext(batchSize);
			translateAndPublishBatch(translator, arg0, batchStartsAt, batchSize, finalSequence);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) {
		publishEvents(translator, 0, arg0.length, arg0, arg1);
	}

	@Override
	public <A, B> void publishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize,
			A[] arg0, B[] arg1) {
		checkBounds(arg0, arg1, batchStartsAt, batchSize);
		final long finalSequence = sequencer.next(batchSize);
		translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
	}

	@Override
	public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, A[] arg0, B[] arg1) {
		return tryPublishEvents(translator, 0, arg0.length, arg0, arg1);
	}

	@Override
	public <A, B> boolean tryPublishEvents(EventTranslatorTwoArg<E, A, B> translator, int batchStartsAt, int batchSize,
			A[] arg0, B[] arg1) {
		checkBounds(arg0, arg1, batchStartsAt, batchSize);
		try {
			final long finalSequence = sequencer.tryNext(batchSize);
			translateAndPublishBatch(translator, arg0, arg1, batchStartsAt, batchSize, finalSequence);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1, C[] arg2) {
		publishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
	}

	@Override
	public <A, B, C> void publishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt,
			int batchSize, A[] arg0, B[] arg1, C[] arg2) {
		checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
		final long finalSequence = sequencer.next(batchSize);
		translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
	}

	@Override
	public <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, A[] arg0, B[] arg1,
			C[] arg2) {
		return tryPublishEvents(translator, 0, arg0.length, arg0, arg1, arg2);
	}

	@Override
	public <A, B, C> boolean tryPublishEvents(EventTranslatorThreeArg<E, A, B, C> translator, int batchStartsAt,
			int batchSize, A[] arg0, B[] arg1, C[] arg2) {
		checkBounds(arg0, arg1, arg2, batchStartsAt, batchSize);
		try {
			final long finalSequence = sequencer.tryNext(batchSize);
			translateAndPublishBatch(translator, arg0, arg1, arg2, batchStartsAt, batchSize, finalSequence);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	@Override
	public void publishEvents(EventTranslatorVararg<E> translator, Object[]... args) {
		publishEvents(translator, 0, args.length, args);
	}

	@Override
	public void publishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize, Object[]... args) {
		checkBounds(batchStartsAt, batchSize, args);
		final long finalSequence = sequencer.next(batchSize);
		translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
	}

	@Override
	public boolean tryPublishEvents(EventTranslatorVararg<E> translator, Object[]... args) {
		return tryPublishEvents(translator, 0, args.length, args);
	}

	@Override
	public boolean tryPublishEvents(EventTranslatorVararg<E> translator, int batchStartsAt, int batchSize,
			Object[]... args) {
		checkBounds(args, batchStartsAt, batchSize);
		try {
			final long finalSequence = sequencer.tryNext(batchSize);
			translateAndPublishBatch(translator, batchStartsAt, batchSize, finalSequence, args);
			return true;
		} catch (InsufficientCapacityException e) {
			return false;
		}
	}

	// 首先取出序号，然后把这个序号的数据取出来，然后设置值，然后publish。投递数据也是生产者sequencer在投递。
	@Override
	public void publish(long sequence) {
		sequencer.publish(sequence);//sequencer是全局唯一的，
	}

	@Override
	public void publish(long lo, long hi) {
		sequencer.publish(lo, hi);
	}

	public long remainingCapacity() {
		return sequencer.remainingCapacity();
	}

	private void checkBounds(final EventTranslator<E>[] translators, final int batchStartsAt, final int batchSize) {
		checkBatchSizing(batchStartsAt, batchSize);
		batchOverRuns(translators, batchStartsAt, batchSize);
	}

	private void checkBatchSizing(int batchStartsAt, int batchSize) {
		if (batchStartsAt < 0 || batchSize < 0) {
			throw new IllegalArgumentException(
					"Both batchStartsAt and batchSize must be positive but got: batchStartsAt " + batchStartsAt
							+ " and batchSize " + batchSize);
		} else if (batchSize > bufferSize) {
			throw new IllegalArgumentException("The ring buffer cannot accommodate " + batchSize
					+ " it only has space for " + bufferSize + " entities.");
		}
	}

	private <A> void checkBounds(final A[] arg0, final int batchStartsAt, final int batchSize) {
		checkBatchSizing(batchStartsAt, batchSize);
		batchOverRuns(arg0, batchStartsAt, batchSize);
	}

	private <A, B> void checkBounds(final A[] arg0, final B[] arg1, final int batchStartsAt, final int batchSize) {
		checkBatchSizing(batchStartsAt, batchSize);
		batchOverRuns(arg0, batchStartsAt, batchSize);
		batchOverRuns(arg1, batchStartsAt, batchSize);
	}

	private <A, B, C> void checkBounds(final A[] arg0, final B[] arg1, final C[] arg2, final int batchStartsAt,
			final int batchSize) {
		checkBatchSizing(batchStartsAt, batchSize);
		batchOverRuns(arg0, batchStartsAt, batchSize);
		batchOverRuns(arg1, batchStartsAt, batchSize);
		batchOverRuns(arg2, batchStartsAt, batchSize);
	}

	private void checkBounds(final int batchStartsAt, final int batchSize, final Object[][] args) {
		checkBatchSizing(batchStartsAt, batchSize);
		batchOverRuns(args, batchStartsAt, batchSize);
	}

	private <A> void batchOverRuns(final A[] arg0, final int batchStartsAt, final int batchSize) {
		if (batchStartsAt + batchSize > arg0.length) {
			throw new IllegalArgumentException(
					"A batchSize of: " + batchSize + " with batchStatsAt of: " + batchStartsAt
							+ " will overrun the available number of arguments: " + (arg0.length - batchStartsAt));
		}
	}

	private void translateAndPublish(EventTranslator<E> translator, long sequence) {
		try {
			translator.translateTo(get(sequence), sequence);
		} finally {
			sequencer.publish(sequence);
		}
	}

	private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
		try {
			translator.translateTo(get(sequence), sequence, arg0);
		} finally {
			sequencer.publish(sequence);
		}
	}

	private <A, B> void translateAndPublish(EventTranslatorTwoArg<E, A, B> translator, long sequence, A arg0, B arg1) {
		try {
			translator.translateTo(get(sequence), sequence, arg0, arg1);
		} finally {
			sequencer.publish(sequence);
		}
	}

	private <A, B, C> void translateAndPublish(EventTranslatorThreeArg<E, A, B, C> translator, long sequence, A arg0,
			B arg1, C arg2) {
		try {
			translator.translateTo(get(sequence), sequence, arg0, arg1, arg2);
		} finally {
			sequencer.publish(sequence);
		}
	}

	private void translateAndPublish(EventTranslatorVararg<E> translator, long sequence, Object... args) {
		try {
			translator.translateTo(get(sequence), sequence, args);
		} finally {
			sequencer.publish(sequence);
		}
	}

	private void translateAndPublishBatch(final EventTranslator<E>[] translators, int batchStartsAt,
			final int batchSize, final long finalSequence) {
		final long initialSequence = finalSequence - (batchSize - 1);
		try {
			long sequence = initialSequence;
			final int batchEndsAt = batchStartsAt + batchSize;
			for (int i = batchStartsAt; i < batchEndsAt; i++) {
				final EventTranslator<E> translator = translators[i];
				translator.translateTo(get(sequence), sequence++);
			}
		} finally {
			sequencer.publish(initialSequence, finalSequence);
		}
	}

	private <A> void translateAndPublishBatch(final EventTranslatorOneArg<E, A> translator, final A[] arg0,
			int batchStartsAt, final int batchSize, final long finalSequence) {
		final long initialSequence = finalSequence - (batchSize - 1);
		try {
			long sequence = initialSequence;
			final int batchEndsAt = batchStartsAt + batchSize;
			for (int i = batchStartsAt; i < batchEndsAt; i++) {
				translator.translateTo(get(sequence), sequence++, arg0[i]);
			}
		} finally {
			sequencer.publish(initialSequence, finalSequence);
		}
	}

	private <A, B> void translateAndPublishBatch(final EventTranslatorTwoArg<E, A, B> translator, final A[] arg0,
			final B[] arg1, int batchStartsAt, int batchSize, final long finalSequence) {
		final long initialSequence = finalSequence - (batchSize - 1);
		try {
			long sequence = initialSequence;
			final int batchEndsAt = batchStartsAt + batchSize;
			for (int i = batchStartsAt; i < batchEndsAt; i++) {
				translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i]);
			}
		} finally {
			sequencer.publish(initialSequence, finalSequence);
		}
	}

	private <A, B, C> void translateAndPublishBatch(final EventTranslatorThreeArg<E, A, B, C> translator,
			final A[] arg0, final B[] arg1, final C[] arg2, int batchStartsAt, final int batchSize,
			final long finalSequence) {
		final long initialSequence = finalSequence - (batchSize - 1);
		try {
			long sequence = initialSequence;
			final int batchEndsAt = batchStartsAt + batchSize;
			for (int i = batchStartsAt; i < batchEndsAt; i++) {
				translator.translateTo(get(sequence), sequence++, arg0[i], arg1[i], arg2[i]);
			}
		} finally {
			sequencer.publish(initialSequence, finalSequence);
		}
	}

	private void translateAndPublishBatch(final EventTranslatorVararg<E> translator, int batchStartsAt,
			final int batchSize, final long finalSequence, final Object[][] args) {
		final long initialSequence = finalSequence - (batchSize - 1);
		try {
			long sequence = initialSequence;
			final int batchEndsAt = batchStartsAt + batchSize;
			for (int i = batchStartsAt; i < batchEndsAt; i++) {
				translator.translateTo(get(sequence), sequence++, args[i]);
			}
		} finally {
			sequencer.publish(initialSequence, finalSequence);
		}
	}
}
